{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false
   },
   "source": [
    "<p><font size=-1 color=gray>\n",
    "&copy; Copyright 2018 IBM Corp. All Rights Reserved.\n",
    "<p>\n",
    "Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file\n",
    "except in compliance with the License. You may obtain a copy of the License at\n",
    "http://www.apache.org/licenses/LICENSE-2.0\n",
    "Unless required by applicable law or agreed to in writing, software distributed under the\n",
    "License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either\n",
    "express or implied. See the License for the specific language governing permissions and\n",
    "limitations under the License.\n",
    "</font></p>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Ingest Clickstream Events\n",
    "\n",
    "This notebook uses the [Scala](https://www.scala-lang.org/) programming language\n",
    "to interact with IBM Db2 Event Stream. It demonstrates how to:\n",
    "\n",
    "* Connect to Event Store\n",
    "* Drop and create a database\n",
    "* Define a table schema\n",
    "* Drop and create a table\n",
    "* Load a CSV file into a DataFrame\n",
    "* Batch insert from a DataFrame into a table\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Connect to IBM Db2 Event Store\n",
    "\n",
    "### Determine the IP address of your host\n",
    "\n",
    "Obtain the IP address of the host that you want to connect to by running the appropriate command for your operating system:\n",
    "\n",
    "* On Mac, run: `ifconfig`\n",
    "* On Windows, run: `ipconfig`\n",
    "* On Linux, run: `hostname -i`\n",
    "\n",
    "Edit the `HOST = \"XXX.XXX.XXX.XXX\"` value in the next cell to provide the IP address."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "// Set your host IP address\n",
    "val Host = \"XXX.XXX.XXX.XXX\"\n",
    "\n",
    "//Port will be 1100 for version 1.1.2 or later (5555 for version 1.1.1)\n",
    "val Port = \"1100\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Import Scala packages"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import sys.process._\n",
    "import scala.concurrent.{Await, Future}\n",
    "import scala.concurrent.duration.Duration\n",
    "import collection.JavaConverters._\n",
    "import org.apache.spark.sql.Row\n",
    "import org.apache.spark.sql.types._\n",
    "import org.apache.spark.sql.SparkSession\n",
    "import org.apache.spark.sql.DataFrameReader\n",
    "import spark.implicits._\n",
    "\n",
    "import com.ibm.event.catalog.TableSchema\n",
    "import com.ibm.event.oltp.EventContext\n",
    "import com.ibm.event.example.DataGenerator\n",
    "import com.ibm.event.common.ConfigurationReader\n",
    "import com.ibm.event.oltp.InsertResult"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Connect to Event Store"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "ConfigurationReader.setConnectionEndpoints(Host + \":\" + Port)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a database\n",
    "\n",
    "Only one database can be active in Event Store. If you already have a database, you don't need to create one.\n",
    "To create a database in Event Store, you can use the createDatabase function. If you want to drop an existing\n",
    "database to create a new one, use the dropDatabase function first."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "// See the comments and run this cell if you need to DROP and/or CREATE the database.\n",
    "\n",
    "// EventContext.dropDatabase(\"TESTDB\")  // Uncomment this if you want to drop an existing TESTDB\n",
    "val context = EventContext.createDatabase(\"TESTDB\") // Comment this out to re-use an existing TESTDB\n",
    "\n",
    "val error =  context.openDatabase()\n",
    "error.map(e => sys.error(e.toString))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a table\n",
    "\n",
    "### Define the schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "val clickdataSchema = StructType(Array(\n",
    "  StructField(\"eventId\", LongType, false),\n",
    "  StructField(\"eventType\", StringType, false),\n",
    "  StructField(\"timestamp\", StringType, false),\n",
    "  StructField(\"ipaddress\", StringType, false),\n",
    "  StructField(\"sessionId\", StringType, false),\n",
    "  StructField(\"userId\", StringType, false),\n",
    "  StructField(\"pageUrl\", StringType, false),\n",
    "  StructField(\"browser\", StringType, false)))\n",
    "\n",
    "// Define Table schema for clickstream data\n",
    "val clickStreamSchema = TableSchema(\n",
    "  \"ClickStreamTable\", clickdataSchema, Array(\"eventId\"), Array(\"eventId\"))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create the Table\n",
    "If you want to drop the existing table to create a new one, use the dropTable function first."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "// Create the table - skip if table is already created\n",
    "\n",
    "// var res = context.dropTable(clickStreamSchema.tableName)  // Uncomment to drop existing table\n",
    "var res = context.createTable(clickStreamSchema)\n",
    "if (res.isDefined) {\n",
    "  println(s\"Error while creating table ${clickStreamSchema.tableName}\\n: ${res.get}\")\n",
    "} else {\n",
    "  println(s\"Table ${clickStreamSchema.tableName} successfully created.\")\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "val clickstreamTable = context.getTable(\"ClickStreamTable\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Load data from the CSV file to a DataFrame\n",
    "Use the `add data assets` in the UI to make the file available to the notebook.\n",
    "Then read the file from the assets directory into a DataFrame."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "// Initialize spark session\n",
    "val spark: SparkSession = SparkSession.builder().getOrCreate()\n",
    "val clickStreamDF = spark.read.option(\"header\", \"true\").option(\"inferSchema\", false).schema(clickdataSchema).csv(\"assets/clickstream_data.csv\")\n",
    "clickStreamDF.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Load data from the DataFrame to the table\n",
    "Use the batchInsert function to load the data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "// Iteratively Insert rows in batch\n",
    "val iter = clickStreamDF.toLocalIterator() \n",
    "val error = context.batchInsert(clickstreamTable, iter.asScala)\n",
    "if (error.isDefined) {\n",
    "  System.err.println(error)\n",
    "}\n",
    "println(\"Ingest completed successfully\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Apache Toree - Scala",
   "language": "scala",
   "name": "apache_toree_scala"
  },
  "language_info": {
   "file_extension": ".scala",
   "name": "scala",
   "version": "2.11.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
